热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

更多|本文_Springboot系列(三十二):Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Springboot系列(三十二):Springboot集成kafka(环境搭建+演示)|超级详细,建议收藏相关的知识,希望对你有一定的参考价

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Springboot系列(三十二):Springboot集成 kafka(环境搭建+演示)|超级详细,建议收藏相关的知识,希望对你有一定的参考价值。




👨‍🎓 作者:bug菌


🎉简介:在CSDN、掘金等社区优质创作者,全网合计6w粉+,对一切技术都感兴趣,重心偏java方向,目前运营公众号[猿圈奇妙屋],欢迎小伙伴们的加入,一起秃头。


🚫特别声明:原创不易,转载请附上原文出处链接和本文声明,谢谢配合。


🙏版权声明:文章里可能部分文字或者图片来源于互联网或者百度百科,如有侵权请联系bug菌处理。







【开发云】年年都是折扣价,不用四处薅羊毛


         嗨,家人们,我是bug菌呀,我又来啦。今天我们来聊点什么咧,OK,接着为大家更《springboot零基础入门教学》系列文章吧。希望能帮助更多的初学者们快速入门!



       小伙伴们在批阅文章的过程中如果觉得文章对您有一丝丝帮助,还请别吝啬您手里的赞呀,大胆的把文章
点亮👍吧,您的点赞三连(
收藏⭐️+关注👨‍🎓+留言📃)就是对bug菌我创作道路上最好的鼓励与支持😘。时光不弃🏃🏻‍♀️,创作不停💕,加油☘️

一、前言🔥

       上一期,我是带着大家入门了springboot集成RabbitMq,今天我再来一期kafka的零基础教学吧。不知道大家对kafka有多少了解,反正我就是从搭建开始,然后再加一个简单演示,这就算是带着大家了个门哈,剩下的我再后边慢慢出教程给大家说。


二、环境说明🔥



演示环境:idea2019.3 + springboot 2.3.1REALSE + windows10 + kafka



三、概念🔥

        kafka是linkedin开源的分布式发布-订阅消息系统,目前归属于Apache的顶级项目。主要特点是基于pull模式来处理消息消费,追求高吞吐量,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。

        一开始的目的是日志的收集和传输。0.8版本开始支持复制,不支持事务,对消息的丢失,重复,错误没有严格要求 适用于产生大量数据的互联网服务的数据收集业务。在廉价的服务器上都能有很高的性能,这个主要是基于操作系统底层的pagecache,不用内存胜似使用内存。

       综上所述,kafka是一款开源的消息引擎系统(消息队列/消息中间件) 分布式流处理平台。


四、Windows安装kafka🔥


1️⃣下载kafka安装包

下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz

下载完后是这么个东西:


2️⃣下载好后,进行解压


 3️⃣配置修改

        进入config目录,修改server.properties文件,把 log.dirs的值改成 ./logs


 4️⃣kafka启动

        在你的安装目录下的bin\\windows目录上直接输入cmd然后回车。

执行命令:输入

kafka-server-start.bat ../../config/server.properties

 然后可以看到控制台启动报错:

        可以看到,kafka还依赖于zookeeper。所以我们接下来再安装zookeeper,启动zookeeper后再启动kafka试试。


五、安装zookeeper🔥


1️⃣下载zookeeper安装包

        先下载zookeeper,下载地址:https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.7.0


 2️⃣解压

        下载完后进行解压,解压到指定的目录下;

        然后再将apache-zookeeper-3.7.0-bin也一并下载,进行解压完后,将目录下的lib文件夹复制到apache-zookeeper-3.7.0主目录下:

 否则后续启动肯定会报错。


 3️⃣修改配置

        修改zoo_sample.cfg 文件名(./conf) 为 zoo.cfg

        同样再编辑它指定日志位置,具体配置文件如下:(这里使用notepad++小绿本进行编辑);我这里是指定在同级目录下了。这个你根据实际情况而定。

         具体添加如下:仅供参考复制。

#原目录;直接注释掉
#dataDir=/tmp/zookeeper
#指定新目录
#保存数据的目录
dataDir=./data
# 保存日志文件的目录
dataLogDir=./log

4️⃣启动zookeeper

        进入到bin目录,并且启动zkServer.cmd,这个脚本中会启动一个java进程。

        如果你们遇到这个报错,请看上边第2点zookeeper解压。报错是因为找不到类包缺少lib这个jar包文件夹,所以你得下载bin包并把lib依赖都复制过来。

输入如下命名进行启动zookeeper

zkServer.cmd

具体启动如下:

 正常启动截图:

        再启动后jps可以看到QuorumPeerMain的进程。直接win+R 输入cmd然后再输入如下命令即可进行查询。

启动命令如下:

jps -l -v

启动客户端连接一下:

进入到/bin目录下,执行如下命令:

zkCli.cmd 127.0.0.1:2181

        如上可以看到, zookeeper启动ok。至此,zookeeper就安装完成啦。

        所以我们再来启动一下kafka看看,是否还会跟刚才一样报错。

        切记不要关zookeeper启动服务小黑窗,也就是你执行那串命令的窗口,若是关了你再重新启动即可。

zkServer.cmd

kafka启动成功截图:


六、kafka项目集成🔥


1️⃣pom引入



org.springframework.kafka
spring-kafka

2️⃣配置kafka

#配置kafka 服务器
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false

3️⃣topic初始化

package com.example.demo.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* topic初始化
*
* @author luoYong
* @version 1.0
* @date 2022/2/28 17:39
*/
@Configurationpublic class KafkaConfig
/**
* 创建一个名为topic.test的Topic并设置分区数为8,分区副本数为2
*/
@Bean public NewTopic initialTopic()
return new NewTopic("topic.test", 8, (short) 2);


4️⃣定义一个kafka消息发送端

package com.example.demo.component.kafka;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* kafka消息发送端
*
* @author luoYong
* @version 1.0
* @date 2022/2/28 17:40
*/
@Component
@Slf4j
public class KafkaProducer
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(Object obj)
String obj2String = JSONObject.toJSONString(obj);
// 发送消息
kafkaTemplate.send("topic.test", obj).addCallback(new ListenableFutureCallback>()
@Override
public void onFailure(Throwable throwable)
// 发送失败的处理
log.info("topic[] 生产者 发送消息失败[]", "topic.test", throwable.getMessage());

@Override
public void onSuccess(SendResult stringObjectSendResult)
// 成功的处理
log.info("topic[] 生产者 发送消息成功[]", "topic.test", stringObjectSendResult.getProducerRecord().value());

);


5️⃣定义一个kafka消息消费端

package com.example.demo.component.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* 消息接收端-支持多端消费
*
* @author luoYong
* @version 1.0
* @date 2022/2/28 17:42
*/
@Component@Slf4jpublic class KafkaConsumer
@KafkaListener(topics = "topic.test", groupId = "topic.group1")
public void topicTest(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
Optional message = Optional.ofNullable(record.value());
if (message.isPresent())
Object msg = message.get();
log.info("客户端 A 消费了: Topic[] Message[]", topic, msg);
ack.acknowledge();


@KafkaListener(topics = "topic.test", groupId = "topic.group1")
public void topicTest1(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
Optional message = Optional.ofNullable(record.value());
if (message.isPresent())
Object msg = message.get();
log.info("客户端 B 消费了: Topic[] Message[]", topic, msg);
ack.acknowledge();



6️⃣定义一个测试类进行测试

/**
* @author luoYong
* @version 1.0
* @date 2022/2/24 17:02
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaTest
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testSendMsg()
String msg = "hello";
kafkaProducer.send(msg);


7️⃣测试结果如下

       ... ...

       ok,以上就是我这期的全部内容啦,如果还想学习更多,你可以看看我的往期热文推荐哦,每天积累一个奇淫小知识,日积月累下去,你一定能成为令人敬仰的大佬的。好啦,咱们下期见~


七、往期推荐🔥


  • springboot系列(一):如何创建springboot项目及启动
  • springboot系列(二):yaml、properties两配置文件介绍及使用

  • springboot系列(三):多环境切换,实例演示
  • springboot系列(四):stater入门
  • springboot系列(五):史上最最最全springboot常用注解
  • springboot系列(六):mysql配置及数据库查询
  • springboot系列(七):如何通过mybatis-plus实现接口增删改查
  • springboot系列(八):mybatis-plus之条件构造器使用手册
  • springboot系列(九):mybatis-plus之如何自定义sql
  • springboot系列(十):mybatis之xml映射文件>、<=等特殊符号写法
  • springboot系列(十一):实现多数据源配置,开箱即用
  • springboot系列(十二):如何实现邮件发送提醒,你一定得会(准备篇)
  • springboot系列(十三):如何实现发送普通邮件?你一定得会
  • springboot系列(十四):如何实现发送图片、doc文档等附件邮件?你一定得会
  • springboot系列(十五):如何实现静态邮件模板发送?你一定得会
  • springboot系列(十六):如何实现发送邮件提醒,附完整源码
  • springboot系列(十七):集成在线接口文档Swagger2
  • springboot系列(十八):如何Windows安装redis?你玩过么
  • springboot系列(十九):如何集成redis?不会我教你
  • springboot系列(二十):如何通过redis实现手机号验证码功能 
  • ... ...

八、文末🔥

        如果还想要学习更多,小伙伴们可关注bug菌专门为大家创建的专栏《springboot零基础入门教学》,从无到有,从零到一!希望能帮助到更多小伙伴们。






【开发云】年年都是折扣价,不用四处薅羊毛


       我是bug菌,一名想走👣出大山改变命运的程序猿。接下来的路还很长,都等待着我们去突破、去挑战。来吧,小伙伴们,我们一起加油!未来皆可期,fighting!

        最后送大家两句我很喜欢的话,与诸君共勉!



☘️做你想做的人,没有时间限制,只要愿意,什么时候都可以start。

🍀你能从现在开始改变,也可以一成不变,这件事,没有规矩可言,你可以活出最精彩的自己。



​​​



💌如果文章对您有所帮助,就请留下您的吧!(#^.^#);


💝如果喜欢bug菌分享的文章,就请给bug菌点个关注吧!(๑′ᴗ‵๑)づ╭❤~;


💗如果对文章有任何疑问,还请文末留言或者加群吧;


💞鉴于个人经验有限,所有观点及技术研点,如有异议,请直接回复参与讨论(请勿发表攻击言论,谢谢);


💕版权声明:原创不易,转载请附上原文出处链接和本文声明,版权所有,盗版必究!!!谢谢。



推荐阅读
  • 本文整理了一份基础的嵌入式Linux工程师笔试题,涵盖填空题、编程题和简答题,旨在帮助考生更好地准备考试。 ... [详细]
  • Spring 中策略模式的应用:Resource 接口详解
    本文探讨了在 Spring 框架中如何利用 Resource 接口实现资源访问策略。Resource 接口作为资源访问策略的抽象,通过多种实现类支持不同类型的资源访问。 ... [详细]
  • 本文为初学者提供了一条清晰的学习路线,帮助他们逐步成长为优秀的Web开发人员。通过十个关键步骤,涵盖从基础到高级的各个方面,确保每位学习者都能找到适合自己的学习方向。 ... [详细]
  • 阿里云 Aliplayer高级功能介绍(八):安全播放
    如何保障视频内容的安全,不被盗链、非法下载和传播,阿里云视频点播已经有一套完善的机 ... [详细]
  • 本文详细介绍如何在忘记MySQL服务器密码的情况下进行密码重置,包括具体的步骤和注意事项。 ... [详细]
  • 本文详细介绍了 Java 网站开发的相关资源和步骤,包括常用网站、开发环境和框架选择。 ... [详细]
  • 本文详细介绍了Java代码分层的基本概念和常见分层模式,特别是MVC模式。同时探讨了不同项目需求下的分层策略,帮助读者更好地理解和应用Java分层思想。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • Android 构建基础流程详解
    Android 构建基础流程详解 ... [详细]
  • 本文探讨了 TypeScript 中泛型的重要性和应用场景,通过多个实例详细解析了泛型如何提升代码的复用性和类型安全性。 ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 本文介绍了Spring 2.0引入的TaskExecutor接口及其多种实现,包括同步和异步执行任务的方式。文章详细解释了如何在Spring应用中配置和使用这些线程池实现,以提高应用的性能和可管理性。 ... [详细]
  • javax.mail.search.BodyTerm.matchPart()方法的使用及代码示例 ... [详细]
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
author-avatar
历史本轻狂
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有